package com.futvan.z.framework.core;

import com.futvan.z.framework.common.bean.Code;
import com.futvan.z.framework.common.bean.Result;
import com.futvan.z.framework.util.DBUtil;
import com.futvan.z.framework.util.DateUtil;
import com.futvan.z.framework.util.StringUtil;
import com.futvan.z.system.zdb.z_db;
import com.futvan.z.system.zdb.z_db_table_column;
import com.futvan.z.system.zetlin.z_etl_in;
import com.futvan.z.system.zetlin.z_etl_in_detail;
import com.futvan.z.system.zetlin.z_etl_in_log;
import org.mybatis.spring.SqlSessionTemplate;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.StatefulJob;
import org.springframework.core.NamedThreadLocal;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
/**
 * 数据抽取任务
 * @author 42239
 *
 */
public class EtlJob implements StatefulJob {
	private static final ThreadLocal<Long> startTime =  new NamedThreadLocal<Long>("Job StartTime");  


	public void execute(JobExecutionContext context) throws JobExecutionException {
		//设置开始执行时间
		long startTime = System.currentTimeMillis();
		//是否执行成功
		boolean issuccess = false;
		//执行日志
		String etllog = "";
		//抽取条数
		String etlcount = "0";
		JobDataMap map = context.getJobDetail().getJobDataMap();
		String etlid =  map.getString("etlid");
		if(z.isNotNull(etlid)) {
			z_etl_in etl = z.etls.get(etlid);
			if(z.isNotNull(etl)) {
				List<z_etl_in_detail> detail_list = etl.getZ_etl_in_detail_list();
				if(z.isNotNull(detail_list) && detail_list.size()>0) {
					//从源数据库抽取数据
					Result SelectR = EtlData(etl);
					if(Code.SUCCESS.equals(SelectR.getCode())) {
						List<HashMap<String, String>> list = (List<HashMap<String, String>>) SelectR.getData();
						if(z.isNotNull(list) && list.size()>0) {
							//写入目标数据库
							z_db to_db = z.dbsMap.get(etl.getTo_db());
							Connection conn = DBUtil.getConnection(to_db);
							if("z".equals(etl.getTo_db())) {
								SqlSessionTemplate s =  z.dbs.get("z");
								try {
									conn = s.getConfiguration().getEnvironment().getDataSource().getConnection();
								} catch (SQLException e) {
									conn = null;
									etllog = "连接核心数据库出错|"+StringUtil.ExceptionToString(e);
								}
							}
							PreparedStatement st = null;
							if(z.isNotNull(conn)) {
								
								//判读是否清空目标表的数据
								DeleteToTableData(to_db,etl);
								
								//执行写入数据到目标表
								try {
									conn.setAutoCommit(false);//关闭自动提交
									Result insertR = InsertData(etl,list,conn,st);
									if(Code.SUCCESS.equals(insertR.getCode())) {
										BigDecimal selectCount = new BigDecimal(list.size());
										BigDecimal insertCount = new BigDecimal(insertR.getMsg());
										if(selectCount.compareTo(insertCount)==0) {
											issuccess = true;
											etlcount = insertR.getMsg();
											etllog = "执行数据抽取任务：【"+etl.getName()+"】|成功抽取数据【"+insertR.getMsg()+"】条";
											conn.commit();
										}else {
											conn.rollback();
											etllog = "执行数据抽取任务出错：【"+etl.getName()+"】|提取数据为【"+list.size()+"】条，插入数据为【"+insertR.getMsg()+"】条，提取数与插入数不相等。执行数据回滚操作。";
										}
									}else {
										conn.rollback();
										etllog = "执行数据抽取任务出错：【"+etl.getName()+"】|"+insertR.getMsg();
									}
								} catch (Exception e) {
									etllog = "执行数据抽取任务出错：【"+etl.getName()+"】|写入数据库出现异常|"+StringUtil.ExceptionToString(e);
									try {
										conn.rollback();
									} catch (SQLException e1) {
										etllog = "数据回滚出错";
									}
								}finally {
									AddLog(etlid, startTime, etllog, etlcount, issuccess);
									try {
										st.close();
										conn.close();
									} catch (SQLException e) {
										etllog = "关闭数据库连接出错";
									}
								}
							}else {
								etllog = "执行数据抽取任务出错：【"+etl.getName()+"】|获取目标数据库Connection出错";
							}
						}else {
							etllog = "执行数据抽取任务：【"+etl.getName()+"】|从源数据库未抽取到可用数据";
						}
					}else {
						etllog = "执行数据抽取任务出错：【"+etl.getName()+"】|"+SelectR.getMsg();
					}
				}else {
					etllog = "执行数据抽取任务出错：【"+etl.getName()+"】|未定义字段映射关系";
				}
			}else {
				etllog = "数据抽取任务执行出错|根据etlid未找到etl对象";
			}
		}else {
			etllog = "数据抽取任务执行出错|etlid is null";
		}
		AddLog(etlid, startTime, etllog, etlcount, issuccess);
	}


	private void DeleteToTableData(z_db to_db, z_etl_in etl) {
		//判读是否清空目标表的数据
		if("1".equals(etl.getIs_clear_to_db_table())) {
			
			Connection conn = DBUtil.getConnection(to_db);
			PreparedStatement st = null;
			if("z".equals(etl.getTo_db())) {
				SqlSessionTemplate s =  z.dbs.get("z");
				try {
					conn = s.getConfiguration().getEnvironment().getDataSource().getConnection();
				} catch (SQLException e) {
					conn = null;
					z.Log("连接核心数据库出错|"+StringUtil.ExceptionToString(e));
				}
			}
			
			
			if(z.isNotNull(conn)) {
				try {
					conn.setAutoCommit(false);//关闭自动提交
					st = conn.prepareStatement("delete from "+etl.getTo_db_table());
					int num = st.executeUpdate();
					z.Log("清空目标表的数据|"+num+"条数据");
					conn.commit();
				} catch (Exception e) {
					z.Log("执行清空目标表的数据出错：【"+StringUtil.ExceptionToString(e));
					try {
						conn.rollback();
					} catch (SQLException e1) {
						z.Log("数据回滚出错");
					}
				}finally {
					try {
						st.close();
						conn.close();
					} catch (SQLException e) {
						z.Log("关闭数据库连接出错");
					}
				}
			}
			
			
		}
		
	}


	/**
	 * 保存日志
	 * @param etlid
	 * @param startTime
	 * @param etllog
	 * @param etlcount
	 * @param issuccess
	 */
	private void AddLog(String etlid,long startTime,String etllog,String etlcount,boolean issuccess) {
		//设置结束时间    
		long endTime = System.currentTimeMillis();
		//计算时间
		BigDecimal times = (new BigDecimal(endTime).subtract(new BigDecimal(startTime))).divide(new BigDecimal(1000),4, BigDecimal.ROUND_DOWN);

		//插入日志信息
		z_etl_in_log log = new z_etl_in_log();
		log.setZid(z.newZid("z_etl_in_log"));
		log.setPid(etlid);
		log.setRuntime(DateUtil.FormatDate(new Date(startTime), "yyyy-MM-dd HH:mm:ss"));
		log.setEndtime(DateUtil.FormatDate(new Date(endTime), "yyyy-MM-dd HH:mm:ss"));
		log.setTimelenght(String.valueOf(times));
		log.setEtlcount(etlcount);
		log.setEtllog(etllog);
		if(issuccess) {
			log.setIssuccess("1");
		}else {
			log.setIssuccess("0");
		}
		SqlSessionTemplate zs = z.dbs.get("z");
		zs.insert("z_etl_in_log_insert", log);


		//更新etl主表最后时间与条数
		z_etl_in in = new z_etl_in();
		in.setZid(etlid);
		in.setLast_time(DateUtil.getDateTime());
		in.setLast_etl_count(etlcount);
		in.setLast_log(etllog);
		if(issuccess) {
			in.setLast_issuccess("1");
		}else {
			in.setLast_issuccess("0");
		}
		zs.update("z_etl_in_update_zid", in);
	}

	/**
	 * 从源数据库抽取数据
	 * @param etl 数据抽取对象
	 * @return Result
	 */
	private Result EtlData(z_etl_in etl) {
		Result result = new Result();
		String sql = etl.getSqlinfo();
		if(z.isNotNull(sql)) {
			//获取session
			SqlSessionTemplate session = getSession(etl.getForm_db());
			if(z.isNotNull(session)) {
				z.Log("开始执行数据抽取任务："+etl.getName()+"|获取待抽取数据中.............");
				List<HashMap<String, Object>> tempList = session.selectList("select", sql);
				List<HashMap<String,String>> list = new ArrayList<HashMap<String,String>>();
				z.Log("获取待抽取数据："+tempList.size()+" 条记录");
				for (HashMap<String, Object> map : tempList) {
					HashMap<String,String> m = new HashMap<String, String>();
					for (HashMap.Entry<String, Object> entry : map.entrySet()) {
						m.put(entry.getKey(), String.valueOf(entry.getValue()));
					}
					list.add(m);
					if(list.size()%1000==0) {
						z.Log("已抽取："+list.size()+" 条记录");
					}
				}
				z.Log("成功抽取数据："+list.size()+" 条记录");
				result.setCode(Code.SUCCESS);
				result.setData(list);
			}else {
				result.setCode(Code.ERROR);
				result.setMsg("session is null");
			}
		}else {
			result.setCode(Code.ERROR);
			result.setMsg("sqlinfo is null");
		}
		return result;
	}



	/**
	 * 写入目标数据库
	 * @param etl 数据抽取对象
	 * @param list 抽取到的数据集
	 * @return Result
	 * @throws Exception 
	 */
	private Result InsertData(z_etl_in etl,List<HashMap<String, String>> list,Connection conn,PreparedStatement st) throws Exception {
		Result result = new Result();
		boolean isOK = true;
		String msg = "";
		for (HashMap<String, String> row : list) {
			StringBuffer sql = new StringBuffer();
			sql.append("insert into "+etl.getTo_db_table()+"(").append("\r\n");
			//获取字段映射关系
			List<z_etl_in_detail> detail_list = etl.getZ_etl_in_detail_list();
			for (int i = 0; i < detail_list.size(); i++) {
				String colunmid = detail_list.get(i).getTo_columnid();
				if(i==detail_list.size()-1) {
					//最后一条
					sql.append(colunmid).append("\r\n");
				}else {
					sql.append(colunmid+",").append("\r\n");
				}
			}
			sql.append(")values(").append("\r\n");
			for (int i = 0; i < detail_list.size(); i++) {
				String value = row.get(detail_list.get(i).getForm_columnid());

				if(i==detail_list.size()-1) {
					//最后一条
					sql.append(CreateColumnValue(value,detail_list.get(i),etl)).append("\r\n");
				}else {
					sql.append(CreateColumnValue(value,detail_list.get(i),etl)+",").append("\r\n");
				}
			}
			sql.append(")").append("\r\n");
			st = conn.prepareStatement(sql.toString());
			int num = st.executeUpdate();
			if(num!=1) {
				isOK = false;
				msg = "Insert返回不为1|【"+sql+"】";
				break;
			}
		}
		if(isOK) {
			result.setCode(Code.SUCCESS);
			result.setMsg(String.valueOf(list.size()));
		}else {
			result.setCode(Code.ERROR);
			result.setMsg(msg);
		}
		return result;
	}

	/**
	 * 生成Insert SQL values 信息
	 * @param value
	 * @param etld
	 * @param etl
	 * @return
	 */
	private String CreateColumnValue(String value, z_etl_in_detail etld, z_etl_in etl) {
		String columnValue = "";
		z_db todb = z.dbsMap.get(etl.getTo_db());
		if(z.isNotNull(todb) && !"z".equals(todb.getDbid())) {
			z_db_table_column column = z.dbTableColumnMap.get(todb.getDbid()+"_"+etl.getTo_db_table()+"_"+etld.getTo_columnid());
			if(z.isNotNull(column)) {
				//获取字段类型，并转为小写
				String column_type = column.getColumn_type().toLowerCase();
				if("mysql".equals(todb.getDb_type())) {
					columnValue = "'"+value+"'";
				}
				if("oracle".equals(todb.getDb_type())) {
					
					//判读是否是日期类型
					if(column_type.indexOf("date")>=0 || column_type.indexOf("timestamp")>=0) {
						columnValue = "to_date('"+DateUtil.FormatStringDate(value, null)+"','YYYY-MM-DD HH24:MI:SS')";
					}else{
						columnValue = "'"+value+"'";
					}
				}
				if("sqlserver".equals(todb.getDb_type())) {
					columnValue = "'"+value+"'";
				}
			}else {
				columnValue = "'"+value+"'";
			}
		}else {
			columnValue = "'"+value+"'";
		}
		return columnValue;
	}


	/**
	 * 获取数据库对象
	 * @param dbid
	 * @return
	 */
	private SqlSessionTemplate getSession(String dbid) {
		SqlSessionTemplate session = null;
		if(z.isNull(dbid)) {
			session = z.dbs.get("z");
		}else {
			session = z.dbs.get(dbid);
		}
		return session;
	}

}
